package com.bw.gmall.realtime.dws.app;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.bean.UserRegisterBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import com.bw.gmall.realtime.common.util.FlinkSinkUtil;
import com.bw.gmall.realtime.dws.function.DorisMapFunction;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/*
12.5用户域用户注册各窗口汇总表
 */

public class DwsUserUserRegisterWindow extends BaseApp {
    public static void main(String[] args) {
        new DwsUserUserRegisterWindow().start(Constant.TOPIC_DWD_USER_REGISTER,
                Constant.DWS_USER_USER_REGISTER_WINDOW,4,10025);
    }

    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
        // 1.数据清洗
        SingleOutputStreamOperator<UserRegisterBean> etlStream = etl(dataStreamSource);

        // 2.添加水位线
        SingleOutputStreamOperator<UserRegisterBean> reduceStream = getReduceStream(etlStream);

        // 3.写到doris
        reduceStream.map(new DorisMapFunction<>())
                .sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_REGISTER_WINDOW));


    }

    private static SingleOutputStreamOperator<UserRegisterBean> getReduceStream(SingleOutputStreamOperator<UserRegisterBean> etlStream){
        return etlStream.assignTimestampsAndWatermarks(WatermarkStrategy
                .<UserRegisterBean>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserRegisterBean>() {
                    @Override
                    public long extractTimestamp(UserRegisterBean userRegisterBean, long l) {
                        return DateFormatUtil.dateTimeToTs(userRegisterBean.getCurDate());
                    }
                })).windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<UserRegisterBean>() {
            @Override
            public UserRegisterBean reduce(UserRegisterBean x, UserRegisterBean y) throws Exception {
                x.setRegisterCt(x.getRegisterCt() + y.getRegisterCt());
                return x;
            }
        }, new AllWindowFunction<UserRegisterBean, UserRegisterBean, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<UserRegisterBean> iterable, Collector<UserRegisterBean> collector) throws Exception {
                long start = timeWindow.getStart();
                long end = timeWindow.getEnd();
                UserRegisterBean userRegisterBean = iterable.iterator().next();
                userRegisterBean.setStt(DateFormatUtil.tsToDateTime(start));
                userRegisterBean.setEdt(DateFormatUtil.tsToDateTime(end));
                userRegisterBean.setCurDate(DateFormatUtil.tsToDate(System.currentTimeMillis()));
                collector.collect(userRegisterBean);
            }
        });
    }

    private static SingleOutputStreamOperator<UserRegisterBean> etl(DataStreamSource<String> dataStreamSource) {
        return dataStreamSource.flatMap(new FlatMapFunction<String, UserRegisterBean>() {
            @Override
            public void flatMap(String s, Collector<UserRegisterBean> collector) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(s);
                    if (jsonObject.size() > 0) {
                        String createTime = jsonObject.getString("create_time");
                        collector.collect(UserRegisterBean.builder().registerCt(1L).curDate(createTime).build());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
